package com.meta.api.flink;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;

/**
 * @Author: xieZW
 * @Date: 2022/8/23 10:41
 * 总风低压预警
 */
public class FlinkSqlTestClickZongFengDiYa {


    public static void main(String[] args) throws Exception {

        //创建flink环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //创建表环境
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);

        //数据源
        SingleOutputStreamOperator singleOutputStreamOperator = env.addSource(new inner())
//                .setParallelism(1)
//                .assignTimestampsAndWatermarks((AssignerWithPeriodicWatermarks<info>) WatermarkStrategy.<info>forBoundedOutOfOrderness(Duration.ZERO)
//                        .withTimestampAssigner(new SerializableTimestampAssigner<info>() {
//                            @Override
//                            public long extractTimestamp(info event, long l) {
//                                return event.getC3();
//                            }
//                        }))
                ;

        //过滤数据源
        SingleOutputStreamOperator map = singleOutputStreamOperator.map(new RichMapFunction<info, Row>() {

            @Override
            public Row map(info s) {
                Row row = new Row(6);
                row.setField(0, s.getC3());
                row.setField(1, s.getC1());
                row.setField(2, "");
                row.setField(3, "");
                row.setField(4, System.currentTimeMillis());
                row.setField(5, s.getC2());
                return row;
            }
        }).returns(new RowTypeInfo(typeInformation)).setParallelism(1);

        //上面过滤后 的 每个字段代表什么意思
        List<String> fieldList = new ArrayList<>();
        fieldList.add("event_time");
        fieldList.add("c_p1_123_0");
        fieldList.add("json_data");
        fieldList.add("process_data");
        fieldList.add("rowtime.rowtime");
        fieldList.add("train_no");

        String[] fieldArray = new String[fieldList.size()];
        fieldList.toArray(fieldArray);
        tableEnvironment.createTemporaryView("input_node_table", map, String.join(",", fieldArray));

        //注册自定义函数
        EnvironmentTools.registerFunction(tableEnvironment);

        //执行sql
        Table table = tableEnvironment.sqlQuery("SELECT t.c_p1_123_0 as node_1,tnode_2.node_2 as node_2,tnode_3.node_3 as node_3," +
                "(tnode_3.node_3) and (tnode_2.node_2) as node_4,(tnode_3.node_3) and (tnode_2.node_2) as node_5,t.event_time," +
                "t.json_data,t.process_data,t.`train_no` from input_node_table t ," +
                "( select MAX(event_time) AS event_time,UNIX_MILLIS(HOP_END(rowtime, INTERVAL '0.5' SECOND, INTERVAL '60' SECOND)) AS window_flag," +
                "`train_no`,MIN((c_p1_123_0)>=7.55) as node_2 from input_node_table GROUP BY `train_no`,HOP(rowtime, INTERVAL '0.5' SECOND, INTERVAL '60' SECOND))" +
                " tnode_2 ," +
                "( select MAX(event_time) AS event_time,UNIX_MILLIS(HOP_END(rowtime, INTERVAL '0.5' SECOND, INTERVAL '60' SECOND)) AS window_flag," +
                "`train_no`,MIN((c_p1_123_0)<=7.8) as node_3 from input_node_table GROUP BY `train_no`,HOP(rowtime, INTERVAL '0.5' SECOND, INTERVAL '60' SECOND)) " +
                "tnode_3 " +
                "where  tnode_2.window_flag = tnode_3.window_flag and " +
                "t.event_time = tnode_2.event_time and t.`train_no` = tnode_2.`train_no` and t.event_time = tnode_3.event_time " +
                "and t.`train_no` = tnode_3.`train_no` ");


        //这个能执行
//        Table table = tableEnvironment.sqlQuery("SELECT t.c_p1_123_0 as node_1,tnode_2.node_2 as node_2,tnode_3.node_3 as node_3," +
//                "(tnode_3.node_3) and (tnode_2.node_2) as node_4,(tnode_3.node_3) and (tnode_2.node_2) as node_5,t.event_time," +
//                "t.json_data,t.process_data,t.`train_no` from input_node_table t ," +
//                "( select MAX(event_time) AS event_time," +
//                "`train_no`,MIN((c_p1_123_0)>=7.55) as node_2 from input_node_table GROUP BY `train_no`)" +
//                " tnode_2 ," +
//                "( select MAX(event_time) AS event_time," +
//                "`train_no`,MIN((c_p1_123_0)<=7.8) as node_3 from input_node_table GROUP BY `train_no`) " +
//                "tnode_3 " +
//                "where   " +
//                "t.event_time = tnode_2.event_time and t.`train_no` = tnode_2.`train_no` and t.event_time = tnode_3.event_time " +
//                "and t.`train_no` = tnode_3.`train_no` ");

        TypeInformation<?>[] fieldTypes = new TypeInformation<?>[9];
        fieldTypes[0] = Types.STRING;
        fieldTypes[1] = Types.BOOLEAN;
        fieldTypes[2] = Types.BOOLEAN;
        fieldTypes[3] = Types.BOOLEAN;
        fieldTypes[4] = Types.BOOLEAN;

        fieldTypes[5] = Types.LONG;
        fieldTypes[6] = Types.STRING;
        fieldTypes[7] = Types.STRING;
        fieldTypes[8] = Types.STRING;

        //表转流
        DataStream<Tuple2<Boolean, Row>> tableStream = tableEnvironment.toRetractStream(table, Types.ROW(fieldTypes));

        //输出
        tableStream.print();

        //执行
        env.execute();
    }

    static class inner implements SourceFunction<info> {
        private Boolean running = true;

        @Override
        public void run(SourceContext<info> ctx) throws Exception {
            Random random = new Random();    // 在指定的数据集中随机选取数据
            String[] users = {"1", "2", "3", "4", "5", "6", "7", "8", "9", "10"};

            while (running) {
                int i = random.nextInt(users.length);
                info info = new info();
                info.setC1(i + "");
                info.setC2("0401");
                info.setC3(System.currentTimeMillis());
                ctx.collect(info);
                // 隔1秒生成一个点击事件，方便观测
                Thread.sleep(1000);
            }
        }

        @Override
        public void cancel() {
            running = false;
        }
    }

    static class info {
        public String c_p1_123_0;
        public String train_no;
        public long timestamp;

        public String getC1() {
            return c_p1_123_0;
        }

        public void setC1(String c1) {
            this.c_p1_123_0 = c1;
        }

        public String getC2() {
            return train_no;
        }

        public void setC2(String c2) {
            this.train_no = c2;
        }

        public long getC3() {
            return timestamp;
        }

        public void setC3(long c3) {
            this.timestamp = c3;
        }
    }


    static TypeInformation<?>[] typeInformation = new TypeInformation<?>[6];

    static {

        typeInformation[0] = Types.LONG;
        typeInformation[1] = Types.STRING;
        typeInformation[2] = Types.STRING;
        typeInformation[3] = Types.STRING;
        typeInformation[4] = Types.LONG;
        typeInformation[5] = Types.STRING;
    }
}
